From ef071736546681facfdc36617632bdafd6c4d4a4 Mon Sep 17 00:00:00 2001 From: "cl349@firebug.cl.cam.ac.uk" Date: Tue, 26 Jul 2005 15:20:09 +0000 Subject: [PATCH] Change watches: operations block until everyone has acked. Watch events are no longer sent to self Watches no longer take a priority async and asyncwait commands for xs_test, now we need to continue despite blocking ops. Print test name at end of verbose run on failure. Use --trace-file arg to xenstored when testing Signed-off-by: Rusty Russel Signed-off-by: Christian Limpach --- tools/xenstore/TODO | 5 +- tools/xenstore/testsuite/07watch.sh | 104 ++++--- tools/xenstore/testsuite/08transaction.sh | 16 +- tools/xenstore/testsuite/10domain-homedir.sh | 4 +- tools/xenstore/testsuite/11domain-watch.sh | 28 +- tools/xenstore/testsuite/12readonly.sh | 7 +- tools/xenstore/testsuite/13watch-ack.sh | 9 +- tools/xenstore/testsuite/test.sh | 6 +- tools/xenstore/xenstored_core.c | 53 +++- tools/xenstore/xenstored_core.h | 12 + tools/xenstore/xenstored_transaction.c | 39 +-- tools/xenstore/xenstored_watch.c | 291 ++++++------------- tools/xenstore/xenstored_watch.h | 6 +- tools/xenstore/xs.c | 10 +- tools/xenstore/xs.h | 4 +- tools/xenstore/xs_test.c | 275 +++++++++++------- 16 files changed, 443 insertions(+), 426 deletions(-) diff --git a/tools/xenstore/TODO b/tools/xenstore/TODO index 8e4185b211..71d5bbbf50 100644 --- a/tools/xenstore/TODO +++ b/tools/xenstore/TODO @@ -2,8 +2,9 @@ TODO in no particular order. Some of these will never be done. There are omissions of important but necessary things. It is up to the reader to fill in the blanks. -- Remove calls to system() from daemon - Timeout failed watch responses -- Dynamic nodes +- Dynamic/supply nodes - Persistant storage of introductions, watches and transactions, so daemon can restart - Remove assumption that rename doesn't fail +- Multi-root transactions, for setting up front and back ends at same time. + diff --git a/tools/xenstore/testsuite/07watch.sh b/tools/xenstore/testsuite/07watch.sh index 88496d55e9..00af679a29 100644 --- a/tools/xenstore/testsuite/07watch.sh +++ b/tools/xenstore/testsuite/07watch.sh @@ -3,45 +3,52 @@ # Watch something, write to it, check watch has fired. [ "`echo -e 'write /test create contents' | ./xs_test 2>&1`" = "" ] -[ "`echo -e '1 watch /test token 100 -2 write /test create contents2 +[ "`echo -e '1 watch /test token +2 async write /test create contents2 1 waitwatch 1 ackwatch token' | ./xs_test 2>&1`" = "1:/test:token" ] # Check that reads don't set it off. -[ "`echo -e '1 watch /test token 100 +[ "`echo -e '1 watch /test token 2 read /test 1 waitwatch' | ./xs_test 2>&1`" = "2:contents2 1:waitwatch timeout" ] # mkdir, setperm and rm should (also tests watching dirs) [ "`echo -e 'mkdir /dir' | ./xs_test 2>&1`" = "" ] -[ "`echo -e '1 watch /dir token 100 -2 mkdir /dir/newdir +[ "`echo -e '1 watch /dir token +2 async mkdir /dir/newdir 1 waitwatch 1 ackwatch token -2 setperm /dir/newdir 0 READ +asyncwait +2 async setperm /dir/newdir 0 READ 1 waitwatch 1 ackwatch token -2 rm /dir/newdir +asyncwait +2 async rm /dir/newdir 1 waitwatch 1 ackwatch token' | ./xs_test 2>&1`" = "1:/dir/newdir:token 1:/dir/newdir:token 1:/dir/newdir:token" ] +# We don't get a watch from our own commands. +[ "`echo -e 'watch /dir token +mkdir /dir/newdir +waitwatch' | ./xs_test 2>&1`" = "waitwatch timeout" ] + # ignore watches while doing commands, should work. -[ "`echo -e 'watch /dir token 100 -write /dir/test create contents +[ "`echo -e 'watch /dir token +1 async write /dir/test create contents read /dir/test waitwatch ackwatch token' | ./xs_test 2>&1`" = "contents /dir/test:token" ] -# watch priority /test. -[ "`echo -e '1 watch /dir token1 1 -3 watch /dir token3 3 -2 watch /dir token2 2 -write /dir/test create contents +# watch priority test: all simultaneous +[ "`echo -e '1 watch /dir token1 +3 watch /dir token3 +2 watch /dir token2 +async write /dir/test create contents 3 waitwatch 3 ackwatch token3 2 waitwatch @@ -52,9 +59,9 @@ write /dir/test create contents 1:/dir/test:token1" ] # If one dies (without acking), the other should still get ack. -[ "`echo -e '1 watch /dir token1 0 -2 watch /dir token2 1 -write /dir/test create contents +[ "`echo -e '1 watch /dir token1 +2 watch /dir token2 +async write /dir/test create contents 2 waitwatch 2 close 1 waitwatch @@ -62,51 +69,52 @@ write /dir/test create contents 1:/dir/test:token1" ] # If one dies (without reading at all), the other should still get ack. -[ "`echo -e '1 watch /dir token1 0 -2 watch /dir token2 1 -write /dir/test create contents +[ "`echo -e '1 watch /dir token1 +2 watch /dir token2 +async write /dir/test create contents 2 close 1 waitwatch 1 ackwatch token1' | ./xs_test 2>&1`" = "1:/dir/test:token1" ] # unwatch -[ "`echo -e '1 watch /dir token1 0 +[ "`echo -e '1 watch /dir token1 1 unwatch /dir token1 -1 watch /dir token2 0 -2 write /dir/test2 create contents +1 watch /dir token2 +2 async write /dir/test2 create contents 1 waitwatch 1 unwatch /dir token2' | ./xs_test 2>&1`" = "1:/dir/test2:token2" ] # unwatch while watch pending. Next watcher gets the event. -[ "`echo -e '1 watch /dir token1 0 -2 watch /dir token2 1 -write /dir/test create contents +[ "`echo -e '1 watch /dir token1 +2 watch /dir token2 +async write /dir/test create contents 2 unwatch /dir token2 1 waitwatch 1 ackwatch token1' | ./xs_test 2>&1`" = "1:/dir/test:token1" ] # unwatch while watch pending. Should clear this so we get next event. -[ "`echo -e '1 watch /dir token1 0 -write /dir/test create contents +[ "`echo -e '1 watch /dir token1 +async write /dir/test create contents 1 unwatch /dir token1 -1 watch /dir/test token2 0 -write /dir/test none contents2 +1 watch /dir/test token2 +asyncwait +async write /dir/test none contents2 1 waitwatch 1 ackwatch token2' | ./xs_test 2>&1`" = "1:/dir/test:token2" ] # check we only get notified once. -[ "`echo -e '1 watch /test token 100 -2 write /test create contents2 +[ "`echo -e '1 watch /test token +2 async write /test create contents2 1 waitwatch 1 ackwatch token 1 waitwatch' | ./xs_test 2>&1`" = "1:/test:token 1:waitwatch timeout" ] # watches are queued in order. -[ "`echo -e '1 watch / token 100 -2 write /test1 create contents -2 write /test2 create contents -2 write /test3 create contents +[ "`echo -e '1 watch / token +async 2 write /test1 create contents +async 2 write /test2 create contents +async 2 write /test3 create contents 1 waitwatch 1 ackwatch token 1 waitwatch @@ -117,9 +125,9 @@ write /dir/test none contents2 1:/test3:token" ] # Creation of subpaths should be covered correctly. -[ "`echo -e '1 watch / token 100 -2 write /test/subnode create contents2 -2 write /test/subnode/subnode create contents2 +[ "`echo -e '1 watch / token +2 async write /test/subnode create contents2 +2 async write /test/subnode/subnode create contents2 1 waitwatch 1 ackwatch token 1 waitwatch @@ -129,23 +137,23 @@ write /dir/test none contents2 1:waitwatch timeout" ] # Watch event must have happened before we registered interest. -[ "`echo -e '1 watch / token 100 -2 write /test/subnode create contents2 -2 watch / token2 0 +[ "`echo -e '1 watch / token +2 async write /test/subnode create contents2 +1 watch / token2 0 1 waitwatch 1 ackwatch token -2 waitwatch' | ./xs_test 2>&1`" = "1:/test/subnode:token -2:waitwatch timeout" ] +1 waitwatch' | ./xs_test 2>&1`" = "1:/test/subnode:token +1:waitwatch timeout" ] # Rm fires notification on child. -[ "`echo -e '1 watch /test/subnode token 100 -2 rm /test +[ "`echo -e '1 watch /test/subnode token +2 async rm /test 1 waitwatch 1 ackwatch token' | ./xs_test 2>&1`" = "1:/test/subnode:token" ] # Watch should not double-send after we ack, even if we did something in between. -[ "`echo -e '1 watch /test2 token 100 -2 write /test2/foo create contents2 +[ "`echo -e '1 watch /test2 token +2 async write /test2/foo create contents2 1 waitwatch 1 read /test2/foo 1 ackwatch token diff --git a/tools/xenstore/testsuite/08transaction.sh b/tools/xenstore/testsuite/08transaction.sh index c5311ed822..4c786df687 100644 --- a/tools/xenstore/testsuite/08transaction.sh +++ b/tools/xenstore/testsuite/08transaction.sh @@ -45,37 +45,37 @@ echo write /test/entry1 create contents | ./xs_test sleep 1 rm /test/entry1 commit -dir /test' | ./xs_test`" = "" ] +dir /test' | ./xs_test --no-timeout`" = "" ] # ... as long as noone is waiting. [ "`echo -e '1 start /test 2 mkdir /test/dir 1 mkdir /test/dir 1 dir /test -1 commit' | ./xs_test 2>&1`" = "1:dir +1 commit' | ./xs_test --no-timeout 2>&1`" = "1:dir FATAL: 1: commit: Connection timed out" ] # Events inside transactions don't trigger watches until (successful) commit. -[ "`echo -e '1 watch /test token 100 +[ "`echo -e '1 watch /test token 2 start /test 2 mkdir /test/dir/sub 1 waitwatch' | ./xs_test 2>&1`" = "1:waitwatch timeout" ] -[ "`echo -e '1 watch /test token 100 +[ "`echo -e '1 watch /test token 2 start /test 2 mkdir /test/dir/sub 2 abort 1 waitwatch' | ./xs_test 2>&1`" = "1:waitwatch timeout" ] -[ "`echo -e '1 watch /test token 100 +[ "`echo -e '1 watch /test token 2 start /test 2 mkdir /test/dir/sub -2 commit +2 async commit 1 waitwatch 1 ackwatch token' | ./xs_test 2>&1`" = "1:/test/dir/sub:token" ] # Rm inside transaction works like rm outside: children get notified. -[ "`echo -e '1 watch /test/dir/sub token 100 +[ "`echo -e '1 watch /test/dir/sub token 2 start /test 2 rm /test/dir -2 commit +2 async commit 1 waitwatch 1 ackwatch token' | ./xs_test 2>&1`" = "1:/test/dir/sub:token" ] diff --git a/tools/xenstore/testsuite/10domain-homedir.sh b/tools/xenstore/testsuite/10domain-homedir.sh index 398adb562c..39f03b2ebc 100644 --- a/tools/xenstore/testsuite/10domain-homedir.sh +++ b/tools/xenstore/testsuite/10domain-homedir.sh @@ -13,8 +13,8 @@ entry1" ] # Place a watch using a relative path: expect relative answer. [ "`echo 'introduce 1 100 7 /home 1 mkdir foo -1 watch foo token 0 -write /home/foo/bar create contents +1 watch foo token +async write /home/foo/bar create contents 1 waitwatch 1 ackwatch token' | ./xs_test 2>&1`" = "handle is 1 1:foo/bar:token" ] diff --git a/tools/xenstore/testsuite/11domain-watch.sh b/tools/xenstore/testsuite/11domain-watch.sh index f42fb5f8c6..6793244bca 100644 --- a/tools/xenstore/testsuite/11domain-watch.sh +++ b/tools/xenstore/testsuite/11domain-watch.sh @@ -6,42 +6,46 @@ [ "`echo -e 'mkdir /dir' | ./xs_test 2>&1`" = "" ] [ "`echo -e 'introduce 1 100 7 /my/home -1 watch /test token 100 -write /test create contents2 +1 watch /test token +async write /test create contents2 1 waitwatch 1 ackwatch token 1 unwatch /test token +asyncwait release 1' | ./xs_test 2>&1`" = "handle is 1 1:/test:token" ] # ignore watches while doing commands, should work. [ "`echo -e 'introduce 1 100 7 /my/home -1 watch /dir token 100 -1 write /dir/test create contents -1 read /dir/test +1 watch /dir token +async write /dir/test create contents +1 write /dir/test2 create contents2 +1 write /dir/test3 create contents3 +1 write /dir/test4 create contents4 1 waitwatch 1 ackwatch token +asyncwait release 1' | ./xs_test 2>&1`" = "handle is 1 -1:contents 1:/dir/test:token" ] # unwatch [ "`echo -e 'introduce 1 100 7 /my/home -1 watch /dir token1 0 +1 watch /dir token1 1 unwatch /dir token1 -1 watch /dir token2 0 -2 write /dir/test2 create contents +1 watch /dir token2 +async 2 write /dir/test2 create contents 1 waitwatch 1 unwatch /dir token2 +asyncwait release 1' | ./xs_test 2>&1`" = "handle is 1 1:/dir/test2:token2" ] # unwatch while watch pending. [ "`echo -e 'introduce 1 100 7 /my/home introduce 2 101 8 /my/secondhome -1 watch /dir token1 0 -2 watch /dir token2 1 -write /dir/test create contents +1 watch /dir token1 +2 watch /dir token2 +3 async write /dir/test create contents 2 unwatch /dir token2 1 waitwatch 1 ackwatch token1 diff --git a/tools/xenstore/testsuite/12readonly.sh b/tools/xenstore/testsuite/12readonly.sh index bfe6273fe1..646d95b32a 100644 --- a/tools/xenstore/testsuite/12readonly.sh +++ b/tools/xenstore/testsuite/12readonly.sh @@ -9,7 +9,7 @@ tool" ] [ "`echo 'read /test getperm /test -watch /test token 0 +watch /test token unwatch /test token start / commit @@ -27,7 +27,7 @@ abort' | ./xs_test --readonly 2>&1`" = "contents # Check that watches work like normal. set -m -[ "`echo 'watch / token 0 +[ "`echo 'watch / token waitwatch ackwatch token' | ./xs_test --readonly 2>&1`" = "/test:token" ] & @@ -36,6 +36,3 @@ if wait; then :; else echo Readonly wait test failed: $? exit 1 fi - - - diff --git a/tools/xenstore/testsuite/13watch-ack.sh b/tools/xenstore/testsuite/13watch-ack.sh index 70a7c6fa3e..30125fa6af 100644 --- a/tools/xenstore/testsuite/13watch-ack.sh +++ b/tools/xenstore/testsuite/13watch-ack.sh @@ -15,8 +15,9 @@ echo mkdir /test/3 | ./xs_test [ "`echo '1 watch /test/1 token1 0 1 watch /test/2 token2 0 1 watch /test/3 token3 0 -2 write /test/2 create contents2 +2 async write /test/2 create contents2 1 waitwatch -2 write /test/1 create contents1 -2 write /test/3 create contents3 -1 ackwatch token2' | ./xs_test 2>&1`" = "1:/test/2:token2" ] +3 async write /test/1 create contents1 +4 async write /test/3 create contents3 +1 ackwatch token2 +1 close' | ./xs_test 2>&1`" = "1:/test/2:token2" ] diff --git a/tools/xenstore/testsuite/test.sh b/tools/xenstore/testsuite/test.sh index 3f0055842d..4cd550a28e 100755 --- a/tools/xenstore/testsuite/test.sh +++ b/tools/xenstore/testsuite/test.sh @@ -9,7 +9,7 @@ run_test() mkdir $XENSTORED_ROOTDIR # Weird failures with this. if type valgrind >/dev/null 2>&1; then - valgrind -q --logfile-fd=3 ./xenstored_test --output-pid --no-fork 3>testsuite/tmp/vgout > /tmp/pid 2> testsuite/tmp/xenstored_errors & + valgrind -q --logfile-fd=3 ./xenstored_test --output-pid --trace-file=testsuite/tmp/trace --no-fork 3>testsuite/tmp/vgout > /tmp/pid 2> testsuite/tmp/xenstored_errors & while [ ! -s /tmp/pid ]; do sleep 0; done PID=`cat /tmp/pid` rm /tmp/pid @@ -38,7 +38,9 @@ for f in testsuite/[0-9]*.sh; do echo Test $f passed... else echo Test $f failed, running verbosely... - run_test $f -x + run_test $f -x || true + # That will have filled the screen, repeat message. + echo Test $f failed exit 1 fi done diff --git a/tools/xenstore/xenstored_core.c b/tools/xenstore/xenstored_core.c index f6f6e71cfc..386d323d37 100644 --- a/tools/xenstore/xenstored_core.c +++ b/tools/xenstore/xenstored_core.c @@ -51,7 +51,7 @@ #include "xenstored_domain.h" static bool verbose; -static LIST_HEAD(connections); +LIST_HEAD(connections); static int tracefd = -1; #ifdef TESTING @@ -959,8 +959,11 @@ static void do_write(struct connection *conn, struct buffered_data *in) } add_change_node(conn->transaction, node, false); + if (fire_watches(conn, node, false)) { + conn->watch_ack = XS_WRITE; + return; + } send_ack(conn, XS_WRITE); - fire_watches(conn->transaction, node, false); } static void do_mkdir(struct connection *conn, const char *node) @@ -985,8 +988,11 @@ static void do_mkdir(struct connection *conn, const char *node) } add_change_node(conn->transaction, node, false); + if (fire_watches(conn, node, false)) { + conn->watch_ack = XS_MKDIR; + return; + } send_ack(conn, XS_MKDIR); - fire_watches(conn->transaction, node, false); } static void do_rm(struct connection *conn, const char *node) @@ -1023,8 +1029,11 @@ static void do_rm(struct connection *conn, const char *node) } add_change_node(conn->transaction, node, true); + if (fire_watches(conn, node, true)) { + conn->watch_ack = XS_RM; + return; + } send_ack(conn, XS_RM); - fire_watches(conn->transaction, node, true); } static void do_get_perms(struct connection *conn, const char *node) @@ -1095,8 +1104,11 @@ static void do_set_perms(struct connection *conn, struct buffered_data *in) } add_change_node(conn->transaction, node, false); + if (fire_watches(conn, node, false)) { + conn->watch_ack = XS_SET_PERMS; + return; + } send_ack(conn, XS_SET_PERMS); - fire_watches(conn->transaction, node, false); } /* Process "in" for conn: "in" will vanish after this conversation, so @@ -1321,14 +1333,23 @@ static void unblock_connections(void) struct connection *i, *tmp; list_for_each_entry_safe(i, tmp, &connections, list) { - if (i->state == OK) - continue; - - if (!transaction_covering_node(i->blocked_by)) { - talloc_free(i->blocked_by); - i->blocked_by = NULL; - i->state = OK; - consider_message(i); + switch (i->state) { + case BLOCKED: + if (!transaction_covering_node(i->blocked_by)) { + talloc_free(i->blocked_by); + i->blocked_by = NULL; + i->state = OK; + consider_message(i); + } + break; + case WATCHED: + if (i->watches_unacked == 0) { + i->state = OK; + send_ack(i, i->watch_ack); + } + break; + case OK: + break; } } @@ -1351,6 +1372,8 @@ struct connection *new_connection(connwritefn_t *write, connreadfn_t *read) new->state = OK; new->blocked_by = NULL; + new->watch_ack = XS_ERROR; + new->watches_unacked = 0; new->out = new->waiting_reply = NULL; new->fd = -1; new->id = 0; @@ -1359,6 +1382,7 @@ struct connection *new_connection(connwritefn_t *write, connreadfn_t *read) new->write = write; new->read = read; new->can_write = true; + INIT_LIST_HEAD(&new->watches); talloc_set_fail_handler(out_of_mem, &talloc_fail); if (setjmp(talloc_fail)) { @@ -1430,13 +1454,12 @@ void dump_connection(void) printf(" state = %s\n", i->state == OK ? "OK" : i->state == BLOCKED ? "BLOCKED" + : i->state == WATCHED ? "WATCHED" : "INVALID"); if (i->id) printf(" id = %i\n", i->id); if (i->blocked_by) printf(" blocked on = %s\n", i->blocked_by); - if (i->waiting_for_ack) - printf(" waiting_for_ack TRUE\n"); if (!i->in->inhdr || i->in->used) printf(" got %i bytes of %s\n", i->in->used, i->in->inhdr ? "header" : "data"); diff --git a/tools/xenstore/xenstored_core.h b/tools/xenstore/xenstored_core.h index 75a9bfe0a6..61d47b5342 100644 --- a/tools/xenstore/xenstored_core.h +++ b/tools/xenstore/xenstored_core.h @@ -51,6 +51,8 @@ enum state { /* Blocked by transaction. */ BLOCKED, + /* Waiting for watchers to ack event we caused */ + WATCHED, /* Completed */ OK, }; @@ -71,6 +73,12 @@ struct connection /* Node we are waiting for (if state == BLOCKED) */ char *blocked_by; + /* Are we waiting for watches to be acked from an event we caused? */ + unsigned int watches_unacked; + + /* Type of ack to send once watches fired. */ + enum xsd_sockmsg_type watch_ack; + /* Is this a read-only connection? */ bool can_write; @@ -92,10 +100,14 @@ struct connection /* The domain I'm associated with, if any. */ struct domain *domain; + /* My watches. */ + struct list_head watches; + /* Methods for communicating over this connection: write can be NULL */ connwritefn_t *write; connreadfn_t *read; }; +extern struct list_head connections; /* Return length of string (including nul) at this offset. */ unsigned int get_string(const struct buffered_data *data, diff --git a/tools/xenstore/xenstored_transaction.c b/tools/xenstore/xenstored_transaction.c index 60dcf04130..afaef1bef2 100644 --- a/tools/xenstore/xenstored_transaction.c +++ b/tools/xenstore/xenstored_transaction.c @@ -288,7 +288,6 @@ void do_transaction_start(struct connection *conn, const char *node) static bool commit_transaction(struct transaction *trans) { char *tmp, *dir; - struct changed_node *i; /* Move: orig -> .old, repl -> orig. Cleanup deletes .old. */ dir = node_dir_outside_transaction(trans->node); @@ -301,15 +300,15 @@ static bool commit_transaction(struct transaction *trans) trans->divert, dir); trans->divert = tmp; - - /* Fire off the watches for everything that changed. */ - list_for_each_entry(i, &trans->changes, list) - fire_watches(NULL, i->node, i->recurse); return true; } void do_transaction_end(struct connection *conn, const char *arg) { + struct changed_node *i; + struct transaction *trans; + bool fired = false; + if (!arg || (!streq(arg, "T") && !streq(arg, "F"))) { send_error(conn, EINVAL); return; @@ -320,24 +319,30 @@ void do_transaction_end(struct connection *conn, const char *arg) return; } + /* Set to NULL so fire_watches sends events. */ + trans = conn->transaction; + conn->transaction = NULL; + /* Attach transaction to arg for auto-cleanup */ + talloc_steal(arg, trans); + if (streq(arg, "T")) { - if (conn->transaction->destined_to_fail) { + if (trans->destined_to_fail) { send_error(conn, ETIMEDOUT); - goto failed; + return; } - if (!commit_transaction(conn->transaction)) { + if (!commit_transaction(trans)) { send_error(conn, errno); - goto failed; + return; } - } - talloc_free(conn->transaction); - conn->transaction = NULL; - send_ack(conn, XS_TRANSACTION_END); - return; + /* Fire off the watches for everything that changed. */ + list_for_each_entry(i, &trans->changes, list) + fired |= fire_watches(conn, i->node, i->recurse); + } -failed: - talloc_free(conn->transaction); - conn->transaction = NULL; + if (fired) + conn->watch_ack = XS_TRANSACTION_END; + else + send_ack(conn, XS_TRANSACTION_END); } diff --git a/tools/xenstore/xenstored_watch.c b/tools/xenstore/xenstored_watch.c index 205b70399c..c532da26a8 100644 --- a/tools/xenstore/xenstored_watch.c +++ b/tools/xenstore/xenstored_watch.c @@ -33,69 +33,39 @@ #include "xenstored_domain.h" /* FIXME: time out unacked watches. */ - -/* We create this if anyone is interested "node", then we pass it from - * watch to watch as each connection acks it. - */ struct watch_event { - /* The watch we are firing for (watch->events) */ + /* The events on this watch. */ struct list_head list; - /* Watches we need to fire for (watches[0]->events == this). */ - struct watch **watches; - unsigned int num_watches; - - struct timeval timeout; - - /* Name of node which changed. */ - char *node; + /* Data to send (node\0token\0). */ + unsigned int len; + char *data; - /* For remove, we trigger on all the children of this node too. */ - bool recurse; + /* Connection which caused watch event (which we are blocking) */ + struct connection *cause; }; struct watch { + /* Watches on this connection */ struct list_head list; - unsigned int priority; /* Current outstanding events applying to this watch. */ struct list_head events; /* Is this relative to connnection's implicit path? */ - bool relative; + const char *relative_path; char *token; char *node; - struct connection *conn; }; -static LIST_HEAD(watches); - -static struct watch_event *get_first_event(struct connection *conn) -{ - struct watch *watch; - struct watch_event *event; - - /* Find first watch with an event. */ - list_for_each_entry(watch, &watches, list) { - if (watch->conn != conn) - continue; - - event = list_top(&watch->events, struct watch_event, list); - if (event) - return event; - } - return NULL; -} /* Look through our watches: if any of them have an event, queue it. */ void queue_next_event(struct connection *conn) { struct watch_event *event; - const char *node; - char *buffer; - unsigned int len; + struct watch *watch; /* We had a reply queued already? Send it: other end will * discard watch. */ @@ -110,170 +80,93 @@ void queue_next_event(struct connection *conn) if (conn->waiting_for_ack) return; - event = get_first_event(conn); - if (!event) - return; - - /* If we decide to cancel, we will reset this. */ - conn->waiting_for_ack = event->watches[0]; - - /* If we deleted /foo and they're watching /foo/bar, that's what we - * tell them has changed. */ - if (!is_child(event->node, event->watches[0]->node)) { - assert(event->recurse); - node = event->watches[0]->node; - } else - node = event->node; - - /* If watch placed using relative path, give them relative answer. */ - if (event->watches[0]->relative) { - node += strlen(get_implicit_path(conn)); - if (node[0] == '/') /* Could be "". */ - node++; - } - - /* Create reply from path and token */ - len = strlen(node) + 1 + strlen(event->watches[0]->token) + 1; - buffer = talloc_array(conn, char, len); - strcpy(buffer, node); - strcpy(buffer+strlen(node)+1, event->watches[0]->token); - send_reply(conn, XS_WATCH_EVENT, buffer, len); - talloc_free(buffer); -} - -static struct watch **find_watches(const char *node, bool recurse, - unsigned int *num) -{ - struct watch *i; - struct watch **ret = NULL; - - *num = 0; - - /* We include children too if this is an rm. */ - list_for_each_entry(i, &watches, list) { - if (is_child(node, i->node) || - (recurse && is_child(i->node, node))) { - (*num)++; - ret = talloc_realloc(node, ret, struct watch *, *num); - ret[*num - 1] = i; + list_for_each_entry(watch, &conn->watches, list) { + event = list_top(&watch->events, struct watch_event, list); + if (event) { + conn->waiting_for_ack = watch; + send_reply(conn,XS_WATCH_EVENT,event->data,event->len); + break; } } - return ret; } -/* FIXME: we fail to fire on out of memory. Should drop connections. */ -void fire_watches(struct transaction *trans, const char *node, bool recurse) +static int destroy_watch_event(void *_event) { - struct watch **watches; - struct watch_event *event; - unsigned int num_watches; + struct watch_event *event = _event; - /* During transactions, don't fire watches. */ - if (trans) - return; - - watches = find_watches(node, recurse, &num_watches); - if (!watches) - return; - - /* Create and fill in info about event. */ - event = talloc(talloc_autofree_context(), struct watch_event); - event->node = talloc_strdup(event, node); - - /* Tie event to this watch. */ - event->watches = watches; - talloc_steal(event, watches); - event->num_watches = num_watches; - event->recurse = recurse; - list_add_tail(&event->list, &watches[0]->events); - - /* Warn if not finished after thirty seconds. */ - gettimeofday(&event->timeout, NULL); - event->timeout.tv_sec += 30; - - /* If connection not doing anything, queue this. */ - if (!watches[0]->conn->out) - queue_next_event(watches[0]->conn); + trace_destroy(event, "watch_event"); + assert(event->cause->watches_unacked != 0); + /* If it hits zero, will unblock in unblock_connections. */ + event->cause->watches_unacked--; + return 0; } -/* We're done with this event: see if anyone else wants it. */ -static void move_event_onwards(struct watch_event *event) +static void add_event(struct connection *cause, struct watch *watch, + const char *node) { - list_del(&event->list); + struct watch_event *event; - event->num_watches--; - event->watches++; - if (!event->num_watches) { - talloc_free(event); - return; + if (watch->relative_path) { + node += strlen(watch->relative_path); + if (*node == '/') /* Could be "" */ + node++; } - list_add_tail(&event->list, &event->watches[0]->events); - - /* If connection not doing anything, queue this. */ - if (!event->watches[0]->conn->out) - queue_next_event(event->watches[0]->conn); + event = talloc(watch, struct watch_event); + event->len = strlen(node) + 1 + strlen(watch->token) + 1; + event->data = talloc_array(event, char, event->len); + strcpy(event->data, node); + strcpy(event->data + strlen(node) + 1, watch->token); + event->cause = cause; + cause->watches_unacked++; + talloc_set_destructor(event, destroy_watch_event); + list_add_tail(&event->list, &watch->events); + trace_create(event, "watch_event"); } -static void remove_watch_from_events(struct watch *dying_watch) +/* FIXME: we fail to fire on out of memory. Should drop connections. */ +bool fire_watches(struct connection *conn, const char *node, bool recurse) { + struct connection *i; struct watch *watch; - struct watch_event *event; - unsigned int i; - list_for_each_entry(watch, &watches, list) { - list_for_each_entry(event, &watch->events, list) { - for (i = 0; i < event->num_watches; i++) { - if (event->watches[i] != dying_watch) - continue; - - assert(i != 0); - memmove(event->watches+i, - event->watches+i+1, - (event->num_watches - (i+1)) - * sizeof(struct watch *)); - event->num_watches--; - } + /* During transactions, don't fire watches. */ + if (conn->transaction) + return false; + + assert(conn->state == OK); + + /* Create an event for each watch. Don't send to self. */ + list_for_each_entry(i, &connections, list) { + if (i == conn) + continue; + + list_for_each_entry(watch, &i->watches, list) { + if (is_child(node, watch->node)) + add_event(conn, watch, node); + else if (recurse && is_child(watch->node, node)) + add_event(conn, watch, watch->node); + else + continue; + conn->state = WATCHED; + /* If connection not doing anything, queue this. */ + if (!i->out) + queue_next_event(i); } } + return conn->state == WATCHED; } static int destroy_watch(void *_watch) { - struct watch *watch = _watch; - struct watch_event *event; - - /* If we have pending events, pass them on to others. */ - while ((event = list_top(&watch->events, struct watch_event, list))) - move_event_onwards(event); - - /* Remove from global list. */ - list_del(&watch->list); - - /* Other events which match this watch must be cleared. */ - remove_watch_from_events(watch); - - trace_destroy(watch, "watch"); + trace_destroy(_watch, "watch"); return 0; } -/* We keep watches in priority order. */ -static void insert_watch(struct watch *watch) -{ - struct watch *i; - - list_for_each_entry(i, &watches, list) { - if (i->priority <= watch->priority) { - list_add_tail(&watch->list, &i->list); - return; - } - } - - list_add_tail(&watch->list, &watches); -} - void shortest_watch_ack_timeout(struct timeval *tv) { + (void)tv; +#if 0 /* FIXME */ struct watch *watch; list_for_each_entry(watch, &watches, list) { @@ -285,10 +178,12 @@ void shortest_watch_ack_timeout(struct timeval *tv) *tv = i->timeout; } } +#endif } void check_watch_ack_timeout(void) { +#if 0 struct watch *watch; struct timeval now; @@ -308,12 +203,13 @@ void check_watch_ack_timeout(void) } } } +#endif } void do_watch(struct connection *conn, struct buffered_data *in) { struct watch *watch; - char *vec[3]; + char *vec[2]; bool relative; if (get_strings(in, vec, ARRAY_SIZE(vec)) != ARRAY_SIZE(vec)) { @@ -331,14 +227,16 @@ void do_watch(struct connection *conn, struct buffered_data *in) watch = talloc(conn, struct watch); watch->node = talloc_strdup(watch, vec[0]); watch->token = talloc_strdup(watch, vec[1]); - watch->conn = conn; - watch->priority = strtoul(vec[2], NULL, 0); - watch->relative = relative; + if (relative) + watch->relative_path = get_implicit_path(conn); + else + watch->relative_path = NULL; + INIT_LIST_HEAD(&watch->events); - insert_watch(watch); - talloc_set_destructor(watch, destroy_watch); + list_add_tail(&watch->list, &conn->watches); trace_create(watch, "watch"); + talloc_set_destructor(watch, destroy_watch); send_ack(conn, XS_WATCH); } @@ -356,9 +254,6 @@ void do_watch_ack(struct connection *conn, const char *token) return; } - event = list_top(&conn->waiting_for_ack->events, - struct watch_event, list); - assert(event->watches[0] == conn->waiting_for_ack); if (!streq(conn->waiting_for_ack->token, token)) { /* They're confused: this will cause us to send event again */ conn->waiting_for_ack = NULL; @@ -366,7 +261,12 @@ void do_watch_ack(struct connection *conn, const char *token) return; } - move_event_onwards(event); + /* Remove event: after ack sent, core will call queue_next_event */ + event = list_top(&conn->waiting_for_ack->events, struct watch_event, + list); + list_del(&event->list); + talloc_free(event); + conn->waiting_for_ack = NULL; send_ack(conn, XS_WATCH_ACK); } @@ -385,11 +285,9 @@ void do_unwatch(struct connection *conn, struct buffered_data *in) * watch we're deleting: conn->waiting_for_ack was reset by * this command in consider_message anyway. */ node = canonicalize(conn, vec[0]); - list_for_each_entry(watch, &watches, list) { - if (watch->conn != conn) - continue; - + list_for_each_entry(watch, &conn->watches, list) { if (streq(watch->node, node) && streq(watch->token, vec[1])) { + list_del(&watch->list); talloc_free(watch); send_ack(conn, XS_UNWATCH); return; @@ -404,15 +302,16 @@ void dump_watches(struct connection *conn) struct watch *watch; struct watch_event *event; - /* Find first watch with an event. */ - list_for_each_entry(watch, &watches, list) { - if (watch->conn != conn) - continue; + if (conn->waiting_for_ack) + printf(" waiting_for_ack for watch on %s token %s\n", + conn->waiting_for_ack->node, + conn->waiting_for_ack->token); - printf(" watch on %s token %s prio %i\n", - watch->node, watch->token, watch->priority); + list_for_each_entry(watch, &conn->watches, list) { + printf(" watch on %s token %s\n", + watch->node, watch->token); list_for_each_entry(event, &watch->events, list) - printf(" event: %s\n", event->node); + printf(" event: %s\n", event->data); } } #endif diff --git a/tools/xenstore/xenstored_watch.h b/tools/xenstore/xenstored_watch.h index c1ab41d866..d1fac70502 100644 --- a/tools/xenstore/xenstored_watch.h +++ b/tools/xenstore/xenstored_watch.h @@ -32,8 +32,10 @@ bool is_watch_event(struct connection *conn, struct buffered_data *out); /* Look through our watches: if any of them have an event, queue it. */ void queue_next_event(struct connection *conn); -/* Fire all watches: recurse means all the children are effected (ie. rm) */ -void fire_watches(struct transaction *trans, const char *node, bool recurse); +/* Fire all watches: recurse means all the children are effected (ie. rm). + * Returns true if there were any, meaning connection has to wait. + */ +bool fire_watches(struct connection *conn, const char *node, bool recurse); /* Find shortest timeout: if any, reduce tv (may already be set). */ void shortest_watch_ack_timeout(struct timeval *tv); diff --git a/tools/xenstore/xs.c b/tools/xenstore/xs.c index c11e02ae1e..a1d667747a 100644 --- a/tools/xenstore/xs.c +++ b/tools/xenstore/xs.c @@ -401,22 +401,16 @@ unwind: /* Watch a node for changes (poll on fd to detect, or call read_watch()). * When the node (or any child) changes, fd will become readable. * Token is returned when watch is read, to allow matching. - * Priority indicates order if multiple watchers: higher is first. * Returns false on failure. */ -bool xs_watch(struct xs_handle *h, const char *path, const char *token, - unsigned int priority) +bool xs_watch(struct xs_handle *h, const char *path, const char *token) { - char prio[MAX_STRLEN(priority)]; - struct iovec iov[3]; + struct iovec iov[2]; - sprintf(prio, "%u", priority); iov[0].iov_base = (void *)path; iov[0].iov_len = strlen(path) + 1; iov[1].iov_base = (void *)token; iov[1].iov_len = strlen(token) + 1; - iov[2].iov_base = prio; - iov[2].iov_len = strlen(prio) + 1; return xs_bool(xs_talkv(h, XS_WATCH, iov, ARRAY_SIZE(iov), NULL)); } diff --git a/tools/xenstore/xs.h b/tools/xenstore/xs.h index 8779a6b33a..1daf7b0150 100644 --- a/tools/xenstore/xs.h +++ b/tools/xenstore/xs.h @@ -82,11 +82,9 @@ bool xs_set_permissions(struct xs_handle *h, const char *path, /* Watch a node for changes (poll on fd to detect, or call read_watch()). * When the node (or any child) changes, fd will become readable. * Token is returned when watch is read, to allow matching. - * Priority indicates order if multiple watchers: higher is first. * Returns false on failure. */ -bool xs_watch(struct xs_handle *h, const char *path, const char *token, - unsigned int priority); +bool xs_watch(struct xs_handle *h, const char *path, const char *token); /* Return the FD to poll on to see if a watch has fired. */ int xs_fileno(struct xs_handle *h); diff --git a/tools/xenstore/xs_test.c b/tools/xenstore/xs_test.c index 6ce5d701af..71719365dd 100644 --- a/tools/xenstore/xs_test.c +++ b/tools/xenstore/xs_test.c @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -33,6 +34,10 @@ #define XSTEST static struct xs_handle *handles[10] = { NULL }; +static unsigned int children; + +static bool timeout = true; +static bool readonly = false; struct ringbuf_head { @@ -173,7 +178,9 @@ static void __attribute__((noreturn)) usage(void) " getperm \n" " setperm ...\n" " shutdown\n" - " watch \n" + " watch \n" + " async ...\n" + " asyncwait\n" " waitwatch\n" " ackwatch \n" " unwatch \n" @@ -186,22 +193,34 @@ static void __attribute__((noreturn)) usage(void) " dump\n"); } -static char *arg(char *line, unsigned int num) +static int argpos(const char *line, unsigned int num) { - static char *args[10]; - unsigned int i, len = 0; + unsigned int i, len = 0, off = 0; for (i = 0; i <= num; i++) { - line += len; - line += strspn(line, " \t\n"); - len = strcspn(line, " \t\n"); + off += len; + off += strspn(line + off, " \t\n"); + len = strcspn(line + off, " \t\n"); if (!len) - barf("Can't get arg %u", num); + return off; } + return off; +} + +static char *arg(char *line, unsigned int num) +{ + static char *args[10]; + unsigned int off, len; + + off = argpos(line, num); + len = strcspn(line + off, " \t\n"); + + if (!len) + barf("Can't get arg %u", num); free(args[num]); args[num] = malloc(len + 1); - memcpy(args[num], line, len); + memcpy(args[num], line+off, len); args[num][len] = '\0'; return args[num]; } @@ -360,10 +379,9 @@ static void do_shutdown(unsigned int handle) failed(handle); } -static void do_watch(unsigned int handle, const char *node, const char *token, - const char *pri) +static void do_watch(unsigned int handle, const char *node, const char *token) { - if (!xs_watch(handles[handle], node, token, atoi(pri))) + if (!xs_watch(handles[handle], node, token)) failed(handle); } @@ -388,6 +406,47 @@ static void do_ackwatch(unsigned int handle, const char *token) failed(handle); } +/* Async wait for watch on handle */ +static void do_command(unsigned int default_handle, char *line); +static void do_async(unsigned int handle, char *line) +{ + int child; + unsigned int i; + children++; + if ((child = fork()) != 0) + return; + + /* Don't keep other handles open in parent. */ + for (i = 0; i < ARRAY_SIZE(handles); i++) { + if (handles[i] && i != handle) { + xs_daemon_close(handles[i]); + handles[i] = NULL; + } + } + + do_command(handle, line + argpos(line, 1)); + exit(0); +} + +static void do_asyncwait(unsigned int handle) +{ + int status; + + if (handle) + barf("handle has no meaning with asyncwait"); + + if (children == 0) + barf("No children to wait for!"); + + if (waitpid(0, &status, 0) > 0) { + if (!WIFEXITED(status)) + barf("async died"); + if (WEXITSTATUS(status)) + exit(WEXITSTATUS(status)); + } + children--; +} + static void do_unwatch(unsigned int handle, const char *node, const char *token) { if (!xs_unwatch(handles[handle], node, token)) @@ -533,23 +592,106 @@ static void dump(int handle) free(subdirs); } +static int handle; + +static void alarmed(int sig __attribute__((unused))) +{ + if (handle) { + char handlename[10]; + sprintf(handlename, "%u:", handle); + write(STDOUT_FILENO, handlename, strlen(handlename)); + } + write(STDOUT_FILENO, command, strlen(command)); + write(STDOUT_FILENO, " timeout\n", strlen(" timeout\n")); + exit(1); +} + +static void do_command(unsigned int default_handle, char *line) +{ + char *endp; + + if (strspn(line, " \n") == strlen(line)) + return; + if (strstarts(line, "#")) + return; + + handle = strtoul(line, &endp, 10); + if (endp != line) + memmove(line, endp+1, strlen(endp)); + else + handle = default_handle; + + if (!handles[handle]) { + if (readonly) + handles[handle] = xs_daemon_open_readonly(); + else + handles[handle] = xs_daemon_open(); + if (!handles[handle]) + barf_perror("Opening connection to daemon"); + } + command = arg(line, 0); + + if (timeout) + alarm(5); + + if (streq(command, "dir")) + do_dir(handle, arg(line, 1)); + else if (streq(command, "read")) + do_read(handle, arg(line, 1)); + else if (streq(command, "write")) + do_write(handle, + arg(line, 1), arg(line, 2), arg(line, 3)); + else if (streq(command, "setid")) + do_setid(handle, arg(line, 1)); + else if (streq(command, "mkdir")) + do_mkdir(handle, arg(line, 1)); + else if (streq(command, "rm")) + do_rm(handle, arg(line, 1)); + else if (streq(command, "getperm")) + do_getperm(handle, arg(line, 1)); + else if (streq(command, "setperm")) + do_setperm(handle, arg(line, 1), line); + else if (streq(command, "shutdown")) + do_shutdown(handle); + else if (streq(command, "watch")) + do_watch(handle, arg(line, 1), arg(line, 2)); + else if (streq(command, "waitwatch")) + do_waitwatch(handle); + else if (streq(command, "async")) + do_async(handle, line); + else if (streq(command, "asyncwait")) + do_asyncwait(handle); + else if (streq(command, "ackwatch")) + do_ackwatch(handle, arg(line, 1)); + else if (streq(command, "unwatch")) + do_unwatch(handle, arg(line, 1), arg(line, 2)); + else if (streq(command, "close")) { + xs_daemon_close(handles[handle]); + handles[handle] = NULL; + } else if (streq(command, "start")) + do_start(handle, arg(line, 1)); + else if (streq(command, "commit")) + do_end(handle, false); + else if (streq(command, "abort")) + do_end(handle, true); + else if (streq(command, "introduce")) + do_introduce(handle, arg(line, 1), arg(line, 2), + arg(line, 3), arg(line, 4)); + else if (streq(command, "release")) + do_release(handle, arg(line, 1)); + else if (streq(command, "dump")) + dump(handle); + else if (streq(command, "sleep")) + sleep(atoi(arg(line, 1))); + else + barf("Unknown command %s", command); + fflush(stdout); + alarm(0); +} + int main(int argc, char *argv[]) { char line[1024]; - bool readonly = false, timeout = true; - int handle; - - static void alarmed(int sig __attribute__((unused))) - { - if (handle) { - char handlename[10]; - sprintf(handlename, "%u:", handle); - write(STDOUT_FILENO, handlename, strlen(handlename)); - } - write(STDOUT_FILENO, command, strlen(command)); - write(STDOUT_FILENO, " timeout\n", strlen(" timeout\n")); - exit(1); - } if (argc > 1 && streq(argv[1], "--readonly")) { readonly = true; @@ -557,7 +699,7 @@ int main(int argc, char *argv[]) argv++; } - if (argc > 1 && streq(argv[1], "--notimeout")) { + if (argc > 1 && streq(argv[1], "--no-timeout")) { timeout = false; argc--; argv++; @@ -570,81 +712,10 @@ int main(int argc, char *argv[]) ringbuf_datasize = getpagesize() / 2 - sizeof(struct ringbuf_head); signal(SIGALRM, alarmed); - while (fgets(line, sizeof(line), stdin)) { - char *endp; + while (fgets(line, sizeof(line), stdin)) + do_command(0, line); - if (strspn(line, " \n") == strlen(line)) - continue; - if (strstarts(line, "#")) - continue; - - handle = strtoul(line, &endp, 10); - if (endp != line) - memmove(line, endp+1, strlen(endp)); - else - handle = 0; - - if (!handles[handle]) { - if (readonly) - handles[handle] = xs_daemon_open_readonly(); - else - handles[handle] = xs_daemon_open(); - if (!handles[handle]) - barf_perror("Opening connection to daemon"); - } - command = arg(line, 0); - - if (timeout) - alarm(5); - if (streq(command, "dir")) - do_dir(handle, arg(line, 1)); - else if (streq(command, "read")) - do_read(handle, arg(line, 1)); - else if (streq(command, "write")) - do_write(handle, - arg(line, 1), arg(line, 2), arg(line, 3)); - else if (streq(command, "setid")) - do_setid(handle, arg(line, 1)); - else if (streq(command, "mkdir")) - do_mkdir(handle, arg(line, 1)); - else if (streq(command, "rm")) - do_rm(handle, arg(line, 1)); - else if (streq(command, "getperm")) - do_getperm(handle, arg(line, 1)); - else if (streq(command, "setperm")) - do_setperm(handle, arg(line, 1), line); - else if (streq(command, "shutdown")) - do_shutdown(handle); - else if (streq(command, "watch")) - do_watch(handle, arg(line, 1), arg(line, 2), arg(line, 3)); - else if (streq(command, "waitwatch")) - do_waitwatch(handle); - else if (streq(command, "ackwatch")) - do_ackwatch(handle, arg(line, 1)); - else if (streq(command, "unwatch")) - do_unwatch(handle, arg(line, 1), arg(line, 2)); - else if (streq(command, "close")) { - xs_daemon_close(handles[handle]); - handles[handle] = NULL; - } else if (streq(command, "start")) - do_start(handle, arg(line, 1)); - else if (streq(command, "commit")) - do_end(handle, false); - else if (streq(command, "abort")) - do_end(handle, true); - else if (streq(command, "introduce")) - do_introduce(handle, arg(line, 1), arg(line, 2), - arg(line, 3), arg(line, 4)); - else if (streq(command, "release")) - do_release(handle, arg(line, 1)); - else if (streq(command, "dump")) - dump(handle); - else if (streq(command, "sleep")) - sleep(atoi(arg(line, 1))); - else - barf("Unknown command %s", command); - fflush(stdout); - alarm(0); - } + while (children) + do_asyncwait(0); return 0; } -- 2.30.2